package org.niit.model

import org.apache.spark.SparkContext
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object ALSMovieDemoTest {
  def main(args: Array[String]): Unit = {
    //TODO 0.准备环境
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("ALSMovieModeling")
      .config("spark.local.dir", "temp")
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    //TODO 1.加载数据并处理
    val fileDS: Dataset[String] = spark.read.textFile("input/u.data")
    val ratingDF: DataFrame = fileDS.map(line => {
      val arr: Array[String] = line.split("\t")
      (arr(0).toInt, arr(1).toInt, arr(2).toDouble)
    }).toDF("userId", "movieId", "score")
    /*
    显示评分：对该产品的直接评分，例如，对电影的评分，对外卖评分  直接存进数据库
    隐示评分：短视频。观看3分钟：1分  点赞：2分 收藏：3分 评论：4分
              1：观看三分钟  2：点赞  3：收藏
     */

    val Array(trainSet,testSet) = ratingDF.randomSplit(Array(0.8,0.2))//按照8:2划分训练集和测试集
    //trainSet:训练集的结果
    //testSet：测试集的结果

    //TODO 2.构建ALS推荐算法模型并训练
    val als: ALS = new ALS()
      .setUserCol("userId") //设置用户id是哪一列
      .setItemCol("movieId") //设置产品id是哪一列
      .setRatingCol("score") //设置评分列
      .setRank(50) //这个值会影响矩阵分解的性能，越大则算法运行的时间和占用的内存可能会越多。通常需要进行调参，一般可以取10-200之间的数
      .setMaxIter(10) //最大迭代次数
      .setAlpha(1.0)//迭代步长
    /*
    userCol：用户列的名字，String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时用户id所在schema中的name
    itemCol：item列的名字，String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时item id所在schema中的name
    ratingCol：rating列的名字，String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时rating值所在schema中的name


     */

    //使用训练集训练模型
    val model: ALSModel = als.fit(trainSet).setColdStartStrategy("drop")
    /*
   coldStartStrategy：String类型。
   有两个取值"nan" or "drop"。
   这个参数指示用在prediction阶段时遇到未知或者新加入的user或item时的处理策略。
   尤其是在交叉验证或者生产场景中，遇到没有在训练集中出现的user/item id时。
   "nan"表示对于未知id的prediction结果为NaN。
   "drop"表示对于transform()的入参DataFrame中出现未知ids的行，
   将会在包含prediction的返回DataFrame中被drop。默认值是"nan"
    */

    //使用测试集测试模型
    //val testResult: DataFrame = model.recommendForAllUsers(5)

    //7.对测试集进行预测
    val predictions: DataFrame = model.transform(testSet.cache())

    //计算模型误差--模型评估
    val evaluator: RegressionEvaluator = new RegressionEvaluator()
      .setMetricName("rmse")//均方根误差
      .setLabelCol("score")
      .setPredictionCol("prediction") //predictionCol：String类型。做transform()操作时输出的预测值在Dataset<Row>结果的schema中的name，默认是“prediction”
    val rmse: Double = evaluator.evaluate(predictions)//均方根误差

    //显示训练集数据
    trainSet.foreach(x =>println("训练集： " + x))
    //显示测试集数据
    testSet.foreach(x => println("测试集： " + x))

    //打印预测结果
    predictions.foreach(x => println("预测结果:  " + x))
    //输出误差
    println("模型误差评估："  + rmse)

    if(rmse <=1.5){
      val path = "output/als_movie_model/" + System.currentTimeMillis()
      model.save(path)

      println("模型path信息已保存")
    }

    //TODO 3.给用户做推荐
    val result1: DataFrame = model.recommendForAllUsers(5)//给所有用户推荐5部电影
    val result2: DataFrame = model.recommendForAllItems(5)//给所有电影推荐5个用户
    //给指定用户推荐5部电影
    val result3: DataFrame = model.recommendForUserSubset(sc.makeRDD(Array(196)).toDF("userId"),5)
    val result4: DataFrame = model.recommendForItemSubset(sc.makeRDD(Array(242)).toDF("movieId"),5)

    result1.show(false)
    result2.show(false)
    result3.show(false)
    result4.show(false)
  }
}
